# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import json
from json import JSONDecodeError, JSONDecoder
from typing import Any

import partial_json_parser
from openai.types.responses import (
    FunctionTool,
    ToolChoiceFunction,
)
from openai.types.responses.tool import Tool
from partial_json_parser.core.options import Allow

from vllm.entrypoints.openai.protocol import (
    ChatCompletionNamedToolChoiceParam,
    ChatCompletionToolsParam,
)


def find_common_prefix(s1: str, s2: str) -> str:
    """
    Finds a common prefix that is shared between two strings, if there is one.
    Order of arguments is NOT important.

    This function is provided as a UTILITY for extracting information from JSON
    generated by partial_json_parser, to help in ensuring that the right tokens
    are returned in streaming, so that close-quotes, close-brackets and
    close-braces are not returned prematurely.

    e.g. find_common_prefix('{"fruit": "ap"}', '{"fruit": "apple"}') ->
    '{"fruit": "ap'
    """
    prefix = ""
    min_length = min(len(s1), len(s2))
    for i in range(0, min_length):
        if s1[i] == s2[i]:
            prefix += s1[i]
        else:
            break
    return prefix


def find_common_suffix(s1: str, s2: str) -> str:
    """
    Finds a common suffix shared between two strings, if there is one. Order of
    arguments is NOT important.
    Stops when the suffix ends OR it hits an alphanumeric character

    e.g. find_common_suffix('{"fruit": "ap"}', '{"fruit": "apple"}') -> '"}'
    """
    suffix = ""
    min_length = min(len(s1), len(s2))
    for i in range(1, min_length + 1):
        if s1[-i] == s2[-i] and not s1[-i].isalnum():
            suffix = s1[-i] + suffix
        else:
            break
    return suffix


def extract_intermediate_diff(curr: str, old: str) -> str:
    """
    Given two strings, extract the difference in the middle between two strings
    that are known to have a common prefix and/or suffix.

    This function is provided as a UTILITY for extracting information from JSON
    generated by partial_json_parser, to help in ensuring that the right tokens
    are returned in streaming, so that close-quotes, close-brackets and
    close-braces are not returned prematurely. The order of arguments IS
    important - the new version of the partially-parsed JSON must be the first
    argument, and the secnod argument must be from the previous generation.

    What it returns, is tokens that should be streamed to the client.

    e.g. extract_intermediate_diff('{"fruit": "apple"}', '{"fruit": "ap"}')
        -> 'ple'

    """
    suffix = find_common_suffix(curr, old)

    old = old[::-1].replace(suffix[::-1], "", 1)[::-1]
    prefix = find_common_prefix(curr, old)
    diff = curr
    if len(suffix):
        diff = diff[::-1].replace(suffix[::-1], "", 1)[::-1]

    if len(prefix):
        # replace the prefix only once in case it's mirrored
        diff = diff.replace(prefix, "", 1)

    return diff


def find_all_indices(string: str, substring: str) -> list[int]:
    """
    Find all (starting) indices of a substring in a given string. Useful for
    tool call extraction
    """
    indices = []
    index = -1
    while True:
        index = string.find(substring, index + 1)
        if index == -1:
            break
        indices.append(index)
    return indices


# partial_json_parser doesn't support extra data and
# JSONDecoder.raw_decode doesn't support partial JSON
def partial_json_loads(input_str: str, flags: Allow) -> tuple[Any, int]:
    try:
        return (partial_json_parser.loads(input_str, flags), len(input_str))
    except JSONDecodeError as e:
        if "Extra data" in e.msg:
            dec = JSONDecoder()
            return dec.raw_decode(input_str)
        raise


def is_complete_json(input_str: str) -> bool:
    try:
        json.loads(input_str)
        return True
    except JSONDecodeError:
        return False


def consume_space(i: int, s: str) -> int:
    while i < len(s) and s[i].isspace():
        i += 1
    return i


def _extract_tool_info(
    tool: Tool | ChatCompletionToolsParam,
) -> tuple[str, dict[str, Any] | None]:
    if isinstance(tool, FunctionTool):
        return tool.name, tool.parameters
    elif isinstance(tool, ChatCompletionToolsParam):
        return tool.function.name, tool.function.parameters
    else:
        raise TypeError(f"Unsupported tool type: {type(tool)}")


def _get_tool_schema_from_tool(tool: Tool | ChatCompletionToolsParam) -> dict:
    name, params = _extract_tool_info(tool)
    params = params if params else {"type": "object", "properties": {}}
    return {
        "properties": {
            "name": {"type": "string", "enum": [name]},
            "parameters": params,
        },
        "required": ["name", "parameters"],
    }


def _get_tool_schema_defs(
    tools: list[Tool | ChatCompletionToolsParam],
) -> dict:
    all_defs: dict[str, dict[str, Any]] = {}
    for tool in tools:
        _, params = _extract_tool_info(tool)
        if params is None:
            continue
        defs = params.pop("$defs", {})
        for def_name, def_schema in defs.items():
            if def_name in all_defs and all_defs[def_name] != def_schema:
                raise ValueError(
                    f"Tool definition '{def_name}' has multiple schemas, "
                    "which is not supported."
                )
            all_defs[def_name] = def_schema
    return all_defs


def _get_json_schema_from_tools(
    tools: list[Tool | ChatCompletionToolsParam],
) -> dict:
    json_schema = {
        "type": "array",
        "minItems": 1,
        "items": {
            "type": "object",
            "anyOf": [_get_tool_schema_from_tool(tool) for tool in tools],
        },
    }
    json_schema_defs = _get_tool_schema_defs(tools)
    if json_schema_defs:
        json_schema["$defs"] = json_schema_defs
    return json_schema


def get_json_schema_from_tools(
    tool_choice: str | ToolChoiceFunction | ChatCompletionNamedToolChoiceParam,
    tools: list[FunctionTool | ChatCompletionToolsParam] | None,
) -> str | dict | None:
    # tool_choice: "none"
    if tool_choice in ("none", None) or tools is None:
        return None
    # tool_choice: Forced Function (Responses)
    if (not isinstance(tool_choice, str)) and isinstance(
        tool_choice, ToolChoiceFunction
    ):
        tool_name = tool_choice.name
        tool_map = {tool.name: tool for tool in tools if isinstance(tool, FunctionTool)}
        if tool_name not in tool_map:
            raise ValueError(f"Tool '{tool_name}' has not been passed in `tools`.")
        return tool_map[tool_name].parameters
    # tool_choice: Forced Function (ChatCompletion)
    if (not isinstance(tool_choice, str)) and isinstance(
        tool_choice, ChatCompletionNamedToolChoiceParam
    ):
        tool_name = tool_choice.function.name
        tool_map = {
            tool.function.name: tool
            for tool in tools
            if isinstance(tool, ChatCompletionToolsParam)
        }
        if tool_name not in tool_map:
            raise ValueError(f"Tool '{tool_name}' has not been passed in `tools`.")
        return tool_map[tool_name].function.parameters
    # tool_choice: "required"
    if tool_choice == "required":
        return _get_json_schema_from_tools(tools)
    # tool_choice: "auto"
    return None
